Skip to content

Commit

Permalink
Merge pull request #283 from kolsys/master
Browse files Browse the repository at this point in the history
Add suppor per channel dump-from dump-to limitation
  • Loading branch information
rusq committed Apr 12, 2024
2 parents 8bcedfa + 85262bd commit 3f3cd76
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 156 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ ZIPFILES=$(foreach s,$(OSES),$(OUTPUT)-$s.zip)
$(OUTPUT)-windows.zip: EXECUTABLE=$(OUTPUT).exe

$(foreach s,$(OSES),$(eval $(OUTPUT)-$s.zip: GOOS=$s))
$(foreach s,$(OSES),$(eval $(OUTPUT)-$s.zip: $(EXECUTABLE)))
$(foreach s,$(OSES),$(eval $(OUTPUT)-$s.zip: $(OUTPUT)))

$(foreach s,$(OSES),$(eval $(OUTPUT)-$s: GOOS=$s))
$(foreach s,$(OSES),$(eval $(OUTPUT)-$s: $(OUTPUT)))


all: $(EXECUTABLE)
Expand Down
5 changes: 3 additions & 2 deletions cmd/slackdump/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func Test_checkParameters(t *testing.T) {
slackdump.DefOptions.CacheDir = app.CacheDir()

// test
emptyEntityList, _ := structures.NewEntityList([]string{})
type args struct {
args []string
}
Expand All @@ -72,7 +73,7 @@ func Test_checkParameters(t *testing.T) {
},
FilenameTemplate: defFilenameTemplate,

Input: config.Input{List: &structures.EntityList{}},
Input: config.Input{List: emptyEntityList},
Output: config.Output{Filename: "-", Format: "text"},
Options: slackdump.DefOptions,
}},
Expand All @@ -93,7 +94,7 @@ func Test_checkParameters(t *testing.T) {
Users: true,
},
FilenameTemplate: defFilenameTemplate,
Input: config.Input{List: &structures.EntityList{}},
Input: config.Input{List: emptyEntityList},
Output: config.Output{Filename: "-", Format: "text"},
Options: slackdump.DefOptions,
}},
Expand Down
44 changes: 30 additions & 14 deletions export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,21 @@ func (se *Export) exclusiveExport(ctx context.Context, uidx structures.UserIndex

chans := make([]slack.Channel, 0)

listIdx := el.Index()
items := el.Index()
// we need the current user to be able to build an index of DMs.
if err := se.sd.StreamChannels(ctx, slackdump.AllChanTypes, func(ch slack.Channel) error {
if include, ok := listIdx[ch.ID]; ok && !include {
trace.Logf(ctx, "info", "skipping %s", ch.ID)
se.lg.Printf("skipping: %s", ch.ID)
return nil
item, ok := items[ch.ID]
if ok {
if !item.Include && item.Oldest.IsZero() && item.Latest.IsZero() {
trace.Logf(ctx, "info", "skipping %s", ch.ID)
se.lg.Printf("skipping: %s", ch.ID)
return nil
}
} else {
item = &structures.EntityItem{
Id: ch.ID,
Include: true,
}
}

var eg errgroup.Group
Expand All @@ -143,7 +151,7 @@ func (se *Export) exclusiveExport(ctx context.Context, uidx structures.UserIndex

// 2. export conversation
eg.Go(func() error {
if err := se.exportConversation(ctx, uidx, ch); err != nil {
if err := se.exportConversation(ctx, uidx, ch, item); err != nil {
return fmt.Errorf("error exporting conversation %s: %w", ch.ID, err)
}
return nil
Expand Down Expand Up @@ -177,13 +185,11 @@ func (se *Export) inclusiveExport(ctx context.Context, uidx structures.UserIndex

// preallocate, some channels might be excluded, so this is optimistic
// allocation
chans := make([]slack.Channel, 0, len(list.Include))

elIdx := list.Index()
chans := make([]slack.Channel, 0, len(list.Index()))

// we need the current user to be able to build an index of DMs.
for _, entry := range list.Include {
if include, ok := elIdx[entry]; ok && !include {
for entry, item := range list.Index() {
if !item.Include && item.Oldest.IsZero() && item.Latest.IsZero() {
se.td(ctx, "info", "skipping %s", entry)
se.lg.Printf("skipping: %s", entry)
continue
Expand All @@ -210,7 +216,7 @@ func (se *Export) inclusiveExport(ctx context.Context, uidx structures.UserIndex
})

eg.Go(func() error {
if err := se.exportConversation(ctx, uidx, *ch); err != nil {
if err := se.exportConversation(ctx, uidx, *ch, item); err != nil {
return fmt.Errorf("error exporting conversation %s: %w", ch.ID, err)
}
return nil
Expand All @@ -229,11 +235,21 @@ func (se *Export) inclusiveExport(ctx context.Context, uidx structures.UserIndex
}

// exportConversation exports one conversation.
func (se *Export) exportConversation(ctx context.Context, userIdx structures.UserIndex, ch slack.Channel) error {
func (se *Export) exportConversation(ctx context.Context, userIdx structures.UserIndex, ch slack.Channel, exportItem *structures.EntityItem) error {
ctx, task := trace.NewTask(ctx, "export.conversation")
defer task.End()

messages, err := se.sd.DumpRaw(ctx, ch.ID, se.opts.Oldest, se.opts.Latest, se.dl.ProcessFunc(validName(ch)))
eOldest := se.opts.Oldest
eLatest := se.opts.Latest

if !exportItem.Oldest.IsZero() {
eOldest = exportItem.Oldest
}
if !exportItem.Latest.IsZero() {
eLatest = exportItem.Latest
}

messages, err := se.sd.DumpRaw(ctx, ch.ID, eOldest, eLatest, se.dl.ProcessFunc(validName(ch)))
if err != nil {
return fmt.Errorf("failed to dump %q (%s): %w", ch.Name, ch.ID, err)
}
Expand Down
3 changes: 2 additions & 1 deletion export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ func TestExport_exportConversation(t *testing.T) {
Return(tt.mocks.rets.closeErr)
}
}
if err := exp.exportConversation(context.Background(), testUserIdx, tt.args.ch); (err != nil) != tt.wantErr {
exportItem := &structures.EntityItem{Id: tt.args.ch.ID}
if err := exp.exportConversation(context.Background(), testUserIdx, tt.args.ch, exportItem); (err != nil) != tt.wantErr {
t.Errorf("Export.exportConversation() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down
16 changes: 9 additions & 7 deletions internal/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,18 @@ func (in *Input) IsValid() bool {

// listProducer iterates over the input.List.Include, and calls fn for each
// entry.
func (in *Input) listProducer(fn func(string) error) error {
func (in *Input) listProducer(fn func(*structures.EntityItem) error) error {
if !in.List.HasIncludes() {
return ErrInvalidInput
}
for _, entry := range in.List.Include {
if err := fn(entry); err != nil {
if errors.Is(err, ErrSkip) {
continue
for _, entry := range in.List.Index() {
if entry.Include {
if err := fn(entry); err != nil {
if errors.Is(err, ErrSkip) {
continue
}
return err
}
return err
}
}
return nil
Expand Down Expand Up @@ -192,7 +194,7 @@ func (p *Params) compileValidateTemplate() error {

// Producer iterates over the list or reads the list from the file and calls
// fn for each entry.
func (in Input) Producer(fn func(string) error) error {
func (in Input) Producer(fn func(*structures.EntityItem) error) error {
if !in.IsValid() {
return ErrInvalidInput
}
Expand Down
21 changes: 16 additions & 5 deletions internal/app/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ func (app *dump) Dump(ctx context.Context) (int, error) {
}

total := 0
if err := app.cfg.Input.Producer(func(channelID string) error {
if err := app.dumpOne(ctx, fs, tmpl, channelID, app.sess.Dump); err != nil {
app.log.Printf("error processing: %q (conversation will be skipped): %s", channelID, err)
if err := app.cfg.Input.Producer(func(channelItem *structures.EntityItem) error {
if err := app.dumpOne(ctx, fs, tmpl, channelItem, app.sess.Dump); err != nil {
app.log.Printf("error processing: %q (conversation will be skipped): %s", channelItem.Id, err)
return config.ErrSkip
}
total++
Expand All @@ -116,8 +116,19 @@ func renderFilename(tmpl *template.Template, c *types.Conversation) string {

// dumpOneChannel dumps just one channel specified by channelInput. If
// generateText is true, it will also generate a ID.txt text file.
func (app *dump) dumpOne(ctx context.Context, fs fsadapter.FS, filetmpl *template.Template, channelInput string, fn dumpFunc) error {
cnv, err := fn(ctx, channelInput, time.Time(app.cfg.Oldest), time.Time(app.cfg.Latest))
func (app *dump) dumpOne(ctx context.Context, fs fsadapter.FS, filetmpl *template.Template, channelItem *structures.EntityItem, fn dumpFunc) error {

eOldest := time.Time(app.cfg.Oldest)
eLatest := time.Time(app.cfg.Latest)

if !channelItem.Oldest.IsZero() {
eOldest = channelItem.Oldest
}
if !channelItem.Latest.IsZero() {
eLatest = channelItem.Latest
}

cnv, err := fn(ctx, channelItem.Id, eOldest, eLatest)
if err != nil {
return err
}
Expand Down
96 changes: 61 additions & 35 deletions internal/structures/entity_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bufio"
"io"
"os"
"sort"
"strings"
"time"

"errors"
)
Expand All @@ -15,16 +15,26 @@ const (
// for export or when downloading conversations.
excludePrefix = "^"
filePrefix = "@"
timeSeparator = "|"
timeFmt = "2006-01-02T15:04:05"

// maxFileEntries is the maximum non-empty entries that will be read from
// the file. Who ever needs more than 64Ki channels.
maxFileEntries = 65536
)

type EntityItem struct {
Id string
Oldest time.Time
Latest time.Time
Include bool
}

// EntityList is an Inclusion/Exclusion list
type EntityList struct {
Include []string
Exclude []string
index map[string]*EntityItem
hasIncludes bool
hasExcludes bool
}

func HasExcludePrefix(s string) bool {
Expand All @@ -49,20 +59,20 @@ func NewEntityList(entities []string) (*EntityList, error) {
}

// MakeEntityList creates an EntityList from a slice of IDs or URLs (entities).
func LoadEntityList(filename string) (*EntityList, error) {
func loadEntityIndex(filename string) (map[string]bool, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
return readEntityList(f, maxFileEntries)
return readEntityIndex(f, maxFileEntries)
}

// readEntityList is a rather naïve implementation that reads the entire file up
// to maxEntries entities (empty lines are skipped), and populates the slice of
// strings, which is then passed to NewEntityList. On large lists it will
// probably use a silly amount of memory.
func readEntityList(r io.Reader, maxEntries int) (*EntityList, error) {
func readEntityIndex(r io.Reader, maxEntries int) (map[string]bool, error) {
br := bufio.NewReader(r)
var elements []string
var total = 0
Expand Down Expand Up @@ -92,44 +102,57 @@ func readEntityList(r io.Reader, maxEntries int) (*EntityList, error) {

total++
}
return NewEntityList(elements)
return buildEntityIndex(elements)
}

func getTimeTuple(item string) []string {
if strings.HasPrefix(item, filePrefix) {
return []string{item}
}
return strings.SplitN(item, timeSeparator, 3)
}

func (el *EntityList) fromIndex(index map[string]bool) {
el.index = make(map[string]*EntityItem, len(index))
for ent, include := range index {
if include {
el.Include = append(el.Include, ent)
} else {
el.Exclude = append(el.Exclude, ent)
}
parts := getTimeTuple(ent)

item := &EntityItem{
Id: parts[0],
Include: include,
}
if len(parts) > 1 {
item.Oldest, _ = time.Parse(timeFmt, parts[1])
}
if len(parts) == 3 {
item.Latest, _ = time.Parse(timeFmt, parts[2])
}

el.index[item.Id] = item
if include {
el.hasIncludes = true
} else {
el.hasExcludes = true
}
}
sort.Strings(el.Include)
sort.Strings(el.Exclude)
}

// Index returns a map where key is entity, and value show if the entity
// should be processed (true) or not (false).
func (el *EntityList) Index() map[string]bool {
var idx = make(map[string]bool, len(el.Include)+len(el.Exclude))
for _, v := range el.Include {
idx[v] = true
}
for _, v := range el.Exclude {
idx[v] = false
}
return idx
func (el *EntityList) Index() map[string]*EntityItem {
return el.index
}

func (el *EntityList) HasIncludes() bool {
return len(el.Include) > 0
return el.hasIncludes
}

func (el *EntityList) HasExcludes() bool {
return len(el.Exclude) > 0
return el.hasExcludes
}

func (el *EntityList) IsEmpty() bool {
return len(el.Include)+len(el.Exclude) == 0
return len(el.index) == 0
}

func buildEntityIndex(entities []string) (map[string]bool, error) {
Expand All @@ -141,38 +164,41 @@ func buildEntityIndex(entities []string) (map[string]bool, error) {
if ent == "" {
continue
}
parts := getTimeTuple(ent)
switch {
case HasExcludePrefix(ent):
trimmed := strings.TrimPrefix(ent, excludePrefix)
case HasExcludePrefix(parts[0]):
trimmed := strings.TrimPrefix(parts[0], excludePrefix)
if trimmed == "" {
continue
}
sl, err := ParseLink(trimmed)
if err != nil {
return nil, err
}
excluded = append(excluded, sl.String())
case hasFilePrefix(ent):
trimmed := strings.TrimPrefix(ent, filePrefix)
parts[0] = sl.String()
excluded = append(excluded, strings.Join(parts, timeSeparator))
case hasFilePrefix(parts[0]):
trimmed := strings.TrimPrefix(parts[0], filePrefix)
if trimmed == "" {
continue
}
files = append(files, trimmed)
default:
sl, err := ParseLink(ent)
sl, err := ParseLink(parts[0])
if err != nil {
return nil, err
}
index[sl.String()] = true
parts[0] = sl.String()
index[strings.Join(parts, timeSeparator)] = true
}
}
// process files
for _, file := range files {
el, err := LoadEntityList(file)
index2, err := loadEntityIndex(file)
if err != nil {
return nil, err
}
for ent, include := range el.Index() {
for ent, include := range index2 {
if include {
index[ent] = true
} else {
Expand Down

0 comments on commit 3f3cd76

Please sign in to comment.